Personalizeで未学習ユーザに対するレコメンドを試してみた
Personalizeではユーザとアイテムの過去のインタラクションログを元に、レコメンドを作成することができます。学習に含んでいないユーザのレコメンドはどのようになるのかUSER_PERSONALIZEタイプの3種類のレシピでソリューションを作成し、検証してみました。
Personalizeでレコメンドを行う流れもかいつまんで紹介してます。未学習ユーザに対するレコメンドの結果については、まとめをご覧ください。
概要
- データセット: MovieLens 100K Dataset | GroupLens
- スキーマ
- Interactions: USER_ID, ITEM_ID, RATING, TIMESTAMP, GENRE_PREFERENCE
- Users: USER_ID, AGE, GENDER
- ITEMS: ITEM_ID, GENRE
- 対象レシピ: USER_PERSONALIZEタイプの3種類(HRNN-Coldstart以外)
- やること
- データセットをダウンロード&加工&Personalizeにインポート
- 3種類のレシピでソリューション&ソリューションバージョン作成
- 各ソリューションバージョンでバッチレコメンド
- レコメンド内容確認
- 各ソリューション(レシピ)でソリューションバージョンを再作成&バッチレコメンド
- レコメンド内容確認
やってみる
試した内容をざっくりかいついまんで紹介していきます。詳細はGitHubに公開しているノートブックをご参照ください。
データ
MovieLensのデータセットを次のような形に加工して、使用します。
まずはデータセットをダウンロードし、インタラクションデータを読み込みます。
!wget -N http://files.grouplens.org/datasets/movielens/ml-100k.zip !unzip -o ml-100k.zip df = pd.read_csv('./ml-100k/u.data', sep='\t', names=['USER_ID', 'ITEM_ID', 'RATING', 'TIMESTAMP']) df
ユーザメタデータを読み込み、加工します。
users = pd.read_csv('./ml-100k/u.user', sep='|', names=[ 'USER_ID', 'AGE', 'GENDER', 'OCCUPATION', 'ZIP_CODE' ], encoding='latin-1') users.set_index('USER_ID', inplace=True) users = users[['AGE', 'GENDER']] users
アイテムメタデータも読み込み、加工します。
items = pd.read_csv('./ml-100k/u.item', sep='|', names=[ 'ITEM_ID', 'TITLE', 'RELEASE_DATE', 'VIDEO_RELEASE_DATE', 'IMDB_URL', 'UNKNOWN', 'ACTION', 'ADVENTURE', 'ANIMATION', "CHILDREN'S", 'COMEDY', 'CRIME', 'DOCUMENTARY', 'DRAMA', 'FANTASY', 'FILM-NOIR', 'HORROR', 'MUSICAL', 'MYSTERY', 'ROMANCE', 'SCI-FI', 'THRILLER', 'WAR', 'WESTERN' ], encoding='latin-1') items.set_index('ITEM_ID', inplace=True) def extract_genre(row): return '|'.join([i for i, v in row[5:].items() if v == 1 ]) items['GENRE'] = items.apply(extract_genre, axis=1) items = items[['TITLE', 'GENRE']] item_watch_count = df.groupby('ITEM_ID').size().sort_values(ascending=False) item_watch_count.name = 'watch_ct' items = items.join(item_watch_count) items
評価した映画のジャンルをランダムに取り出して、映画評価時のユーザの興味という意図でコンテキストデータとして設定します。
# 映画評価時のユーザの興味のあるジャンルをインタラクションデータに追加する(評価した映画のジャンルをランダムに抽出) df['GENRE_PREFERENCE'] = items.GENRE[df['ITEM_ID'].values].str.split('|').apply(lambda x: x[np.random.randint(len(x))]).reset_index(drop=True)
データをアップロードします。
users.to_csv(data_locations['Users']) items['GENRE'].to_csv(data_locations['Items']) # アイテムに関する情報はジャンルだけに絞る df.to_csv(data_locations['Interactions'], index=False)
Personalizeのセットアップ
データの準備ができたので、Personalizeでレコメンドができるように色々準備していきます。
まずはデータセットグループを作成します。
create_dataset_group_response = personalize.create_dataset_group( name=prefix )
スキーマを定義します。
field_definitions = { 'Interactions': [ { 'name': 'USER_ID', 'type': 'string' }, { 'name': 'ITEM_ID', 'type': 'string' }, { 'name': 'RATING', 'type': 'int' }, { 'name': 'TIMESTAMP', 'type': 'long' }, { 'name': 'GENRE_PREFERENCE', 'type': 'string', 'categorical': True } ], 'Users': [ { 'name': 'USER_ID', 'type': 'string' }, { 'name': 'AGE', 'type': 'int' }, { 'name': 'GENDER', 'type': 'string', 'categorical': True } ], 'Items': [ { 'name': 'ITEM_ID', 'type': 'string' }, { 'name': 'GENRE', 'type': 'string', 'categorical': True } ] }
データセットタイプごとに、スキーマとデータセットを作成し、データをインポートします。
dataset_types = ['Interactions', 'Users', 'Items'] dataset_import_job_arns = [] for dataset_type in dataset_types: # スキーマ作成 create_schema_response = personalize.create_schema( name=f'{prefix}-{dataset_type}', schema=json.dumps({ 'type': 'record', 'name': dataset_type, 'namespace': 'com.amazonaws.personalize.schema', 'fields': field_definitions[dataset_type], 'version': '1.0' }) ) # データセット作成 create_dataset_response = personalize.create_dataset( name=f'{prefix}-{dataset_type}', datasetType=dataset_type, datasetGroupArn=dataset_group_arn, schemaArn=create_schema_response['schemaArn'] ) # データ読み込み create_dataset_import_job_response = personalize.create_dataset_import_job( jobName=f'{prefix}-{dataset_type}-{current_dt}', datasetArn=create_dataset_response['datasetArn'], dataSource={ 'dataLocation': data_locations[dataset_type] }, roleArn=role_arn ) dataset_import_job_arns.append(create_dataset_import_job_response['datasetImportJobArn'])
ソリューションバージョンの作成
データのインポートが完了したら、レシピごとにソリューションとソリューションバージョンを作成します。
recipe_arns = [ 'arn:aws:personalize:::recipe/aws-hrnn', 'arn:aws:personalize:::recipe/aws-hrnn-metadata', 'arn:aws:personalize:::recipe/aws-popularity-count' ] solution_version_arns = [] for recipe_arn in recipe_arns: # ソリューションの作成 create_solution_response = personalize.create_solution( name=f'{prefix}-{path.basename(recipe_arn)}', datasetGroupArn=dataset_group_arn, recipeArn=recipe_arn ) solution_arn = create_solution_response['solutionArn'] # ソリューションバージョンの作成(モデルの学習) create_solution_version_response = personalize.create_solution_version( solutionArn=solution_arn ) solution_version_arns.append( create_solution_version_response['solutionVersionArn'])
ソリューションバージョンの作成が完了したら、各ソリューションバージョンの評価指標を取得します。
metrics = {} for solution_version_arn in solution_version_arns: response = personalize.get_solution_metrics( solutionVersionArn=solution_version_arn ) metrics[solution_version_arn.split('/')[-2]] = response['metrics'] pd.DataFrame.from_dict(metrics, orient='index')
レコメンド
バッチレコメンド用にレコメンド対象となるユーザID一覧ファイルを作成します。その際に、データセットに含まれていないユーザIDも追加します。
target_user_ids = list(users.index.values) # 未学習ユーザを追加 target_user_ids.append(users.index.values.max() + 1) target_user_ids.append(users.index.values.max() + 2) user_ids = [json.dumps({'userId': str(user_id)}) for user_id in target_user_ids] # アップロード bucket.Object(user_ids_json_s3_path).put(Body='\n'.join(user_ids))
ソリューションバージョンごとにバッチレコメンドジョブを作成します。
user_ids_json_s3_uri = f's3://{bucket_name}/{user_ids_json_s3_path}' batch_job_arns = [] for solution_version_arn in solution_version_arns: solution_name = solution_version_arn.split('/')[-2] response = personalize.create_batch_inference_job( jobName=f'{solution_name}-{current_dt}', solutionVersionArn=solution_version_arn, numResults=100, jobInput={ 's3DataSource': { 'path': user_ids_json_s3_uri } }, jobOutput={ 's3DataDestination': { 'path': path.join(path.dirname(user_ids_json_s3_uri), solution_name, '') } }, roleArn=role_arn ) batch_job_arns.append(response['batchInferenceJobArn'])
ジョブが完了したら、出力データを取得し、使いやすいように加工します。
def transform_recommendation(dic): return ( int(dic['input']['userId']), list(map(lambda x: int(x), dic['output']['recommendedItems'])) ) user_base_recommendations = {} recommends = {} for batch_job_arn in batch_job_arns: wait_for_batch_inference_job(batch_job_arn) response = personalize.describe_batch_inference_job(batchInferenceJobArn=batch_job_arn) job = response['batchInferenceJob'] file_s3_path = path.join( *job['jobOutput']['s3DataDestination']['path'].split('/')[3:], path.basename(job['jobInput']['s3DataSource']['path']) + '.out' ) body = bucket.Object(file_s3_path).get()['Body'].read() solution_name = job['solutionVersionArn'].split('/')[-2] recommends[solution_name] = [transform_recommendation(json.loads(ss)) for ss in body.splitlines()] user_base_recommendations[solution_name] = dict([transform_recommendation(json.loads(ss)) for ss in body.splitlines()])
レコメンド内容の確認
学習済みユーザ(ID=1)に対するレコメンドです。
未学習済みユーザ(ID=944)のレコメンドです。学習していないユーザでもレコメンドが可能なようです。 Popularity-Countレシピの場合は学習済みと同じレコメンド内容です。
未学習済みユーザ(ID=945)に対するレコメンドです。 先ほど確認したレコメンド内容と同じです。
レコメンド内容の確認(ソリューションバージョン再作成版)
ソリューションバージョンを再作成後に再レコメンドを行い、レコメンド内容を確認します。
まずは学習済みユーザ(ID=1)に対するレコメンドです。 Popularity-Countレシピ以外のレシピは前回とレコメンド内容が変化しています。
未学習済みユーザ(ID=944)のレコメンドです。 学習済みユーザと同様に、Popularity-Countレシピ以外のレシピは前回とレコメンド内容が変化しています。
まとめ
PersonalizeでのUSER_PERSONALIZE
タイプのコールドスタートを除いた3種類のレシピで、未学習のユーザに対するレコメンドがどうなるか試しました。結果は次の通りです。
レシピ | レコメンド可否 | ソリューションバージョン再作成時にレコメンド内容が変わるか | 未学習ユーザ同士でレコメンド内容は同じか | 備考 |
---|---|---|---|---|
HRNN | O | O | O | |
HRNN-Metadata | O | O | O | |
Popularity-Count | O | X | O | 全ユーザに対するレコメンド内容が同じ |
さいごに
未学習のユーザに対するレコメンドをどうするかという懸念はレコメンドを利用する時に考えることがあるかと思います。そういった時などの参考になれば嬉しいです。